Flappy Bird with AI1. 用class將所有遊戲物件獨立包裝2. 加入大腦和作簡單測試3. 輸入有意義的inputs
4. 製作一個世代5. 基因演算法(初始化、評估和選擇)6. 基因演算法(交叉基因)7. 基因演算法(變異)8. 考考你
首先第一步是要修改遊戲, 將bird
用class包裝起來,可以一次過在同一遊戲畫面,有後多鳥在遊玩。
例如下面的遊戲,我就修改成雙打,可以用spacebar
和w
鍵,分別控制兩隻鳥。
flappy_bird.pyde
:
x1from Stuffs import *
2
3panSpeed = 5
4birds = []
5myPipes = []
6
7def setup():
8 global birds, myPipes
9 size(800, 600)
10 frameRate(60)
11 birds = [Bird(), Bird()]
12 myPipes = [Pipe()]
13
14def draw():
15 global birds, myPipes
16
17 if (frameCount % 100 == 0):
18 myPipes.append(Pipe())
19 if (myPipes[0].x < -myPipes[0].w):
20 myPipes.pop(0)
21
22 background("#70C6D5")
23 for bird in birds:
24 bird.update()
25 bird.display()
26 print(bird.collide(myPipes[0]))
27
28 for pipe in myPipes:
29 pipe.update(panSpeed)
30 pipe.display()
31
32def keyPressed():
33 if (key == ' '):
34 birds[0].jump()
35 if (key == 'W' or key == 'w'):
36 birds[1].jump()
37 if (key == 'R' or key == 'r'):
38 setup()
stuffs.py
:
xxxxxxxxxx
491class Bird:
2 def __init__(self):
3 self.score = 0
4 self.pipeCounter = 0
5 self.birdPos = PVector(50, height/2)
6 self.birdVec = PVector(0, 0)
7 self.birdAcc = PVector(0, 0.3)
8 self.isPass = False
9
10 def update(self):
11 self.birdVec.add(self.birdAcc)
12 self.birdPos.add(self.birdVec)
13 self.score += 1
14
15 def jump(self):
16 self.birdVec.y = -8
17
18 def display(self):
19 fill("#D5BB06")
20 ellipse(self.birdPos.x, self.birdPos.y, 25, 25)
21
22 def collide(self, _pipe):
23 R = 25 / 2
24 X = self.birdPos.x
25 Y = self.birdPos.y
26 if (X + R > _pipe.x and X - R < _pipe.x + _pipe.w):
27 if (Y + R > _pipe.y or Y - R < _pipe.y - _pipe.gap):
28 return True
29 if (Y > height):
30 return True
31 return False
32
33class Pipe:
34 x = y = h = 0
35 w = 80
36 gap = 80
37
38 def __init__(self):
39 self.x = width + self.w
40 self.y = random(100, height - 100)
41 self.h = height
42
43 def update(self, _panSpeed):
44 self.x -= _panSpeed
45
46 def display(self):
47 fill(0, 204, 0)
48 rect(self.x, self.y, self.w, self.h)
49 rect(self.x, self.y - self.gap, self.w, -self.h)
從Toy Neural Network將Matrix.py
和nn.py
複製到這個項目中,像下圖,你的項目中應該有4個頁面。
下面的程式,是參考The Coding Train的Neuroevolution Flappy Bird,我將其轉成了Processing for Python版本,你可以參考一下他的影片。
Stuffs.py
:
xxxxxxxxxx
581from nn import NeuralNetwork
2
3class Bird:
4 def __init__(self):
5 self.score = 0
6 self.pipeCounter = 0
7 self.birdPos = PVector(50, height/2)
8 self.birdVec = PVector(0, 0)
9 self.birdAcc = PVector(0, 0.3)
10 self.isPass = False
11 self.brain = NeuralNetwork(4, 4, 1)
12
13 def think(self, pipes):
14 inputs = [1.0, 0.5, 0.2, 0.3]
15 output = self.brain.predict(inputs)
16 if output[0] > 0.5:
17 self.jump()
18
19 def update(self):
20 self.birdVec.add(self.birdAcc)
21 self.birdPos.add(self.birdVec)
22 self.score += 1
23
24 def jump(self):
25 self.birdVec.y = -8
26
27 def display(self):
28 fill("#D5BB06")
29 ellipse(self.birdPos.x, self.birdPos.y, 25, 25)
30
31 def collide(self, _pipe):
32 R = 25 / 2
33 X = self.birdPos.x
34 Y = self.birdPos.y
35 if (X + R > _pipe.x and X - R < _pipe.x + _pipe.w):
36 if (Y + R > _pipe.y or Y - R < _pipe.y - _pipe.gap):
37 return True
38 if (Y > height):
39 return True
40 return False
41
42class Pipe:
43 x = y = h = 0
44 w = 80
45 gap = 80
46
47 def __init__(self):
48 self.x = width + self.w
49 self.y = random(100, height - 100)
50 self.h = height
51
52 def update(self, _panSpeed):
53 self.x -= _panSpeed
54
55 def display(self):
56 fill(0, 204, 0)
57 rect(self.x, self.y, self.w, self.h)
58 rect(self.x, self.y - self.gap, self.w, -self.h)
flappy_bird.pyde
:
xxxxxxxxxx
391from Stuffs import *
2
3panSpeed = 5
4birds = []
5myPipes = []
6
7def setup():
8 global birds, myPipes
9 size(800, 600)
10 frameRate(60)
11 birds = [Bird(), Bird()]
12 myPipes = [Pipe()]
13
14def draw():
15 global birds, myPipes
16
17 if (frameCount % 100 == 0):
18 myPipes.append(Pipe())
19 if (myPipes[0].x < -myPipes[0].w):
20 myPipes.pop(0)
21
22 background("#70C6D5")
23 for bird in birds:
24 bird.think(myPipes)
25 bird.update()
26 bird.display()
27 print(bird.collide(myPipes[0]))
28
29 for pipe in myPipes:
30 pipe.update(panSpeed)
31 pipe.display()
32
33def keyPressed():
34 if (key == ' '):
35 birds[0].jump()
36 if (key == 'W' or key == 'w'):
37 birds[1].jump()
38 if (key == 'R' or key == 'r'):
39 setup()
加入後,首先在stuffs.py
中,一開始導入from nn import NeuralNetwork
。
之後在bird
class中,加入self.brain = NeuralNetwork(4, 4, 1)
,做一個4個輸入, 4個隱藏層和1個輸出的神經網路。
之後,在bird
中加入:
xxxxxxxxxx
51 def think(self, pipes):
2 inputs = [1.0, 0.5, 0.2, 0.3]
3 output = self.brain.predict(inputs)
4 if output[0] > 0.5:
5 self.jump()
首先要測試一下,NeuralNetwork
的class是否真的能導入到這個program中,所以先固定開4個inputs
給它,再在無訓練的情況下,用predict()
產生output
,只要output
大於0.5
就jump()
。
最後,在主程式的draw()
中,
xxxxxxxxxx
51for bird in birds:
2 bird.think(myPipes)
3 bird.update()
4 bird.display()
5 print(bird.collide(myPipes[0]))
在所有birds
中,加係bird.think()
。運行後,兩隻鳥會隨機不停上升或不停下降。
inputs
Stuffs.py
:
xxxxxxxxxx
771from nn import NeuralNetwork
2
3class Bird:
4 def __init__(self):
5 self.score = 0
6 self.pipeCounter = 0
7 self.birdPos = PVector(50, height/2)
8 self.birdVec = PVector(0, 0)
9 self.birdAcc = PVector(0, 0.3)
10 self.brain = NeuralNetwork(4, 4, 1)
11
12 def think(self, pipes):
13 inputs = []
14
15 inputs.append(map(self.birdPos.y, 0, height, 0, 1))
16 if not (self.birdIsPass(pipes[0])):
17 inputs.append(map(pipes[0].y, 0, height, 0 ,1))
18 inputs.append(map((pipes[0].y - pipes[0].gap), 0, height, 0, 1))
19 inputs.append(map(pipes[0].x, 0, width, 0, 1))
20 else:
21 inputs.append(map(pipes[1].y, 0, height, 0 ,1))
22 inputs.append(map((pipes[1].y - pipes[1].gap), 0, height, 0, 1))
23 inputs.append(map(pipes[1].x, 0, width, 0, 1))
24
25 print(self.birdIsPass(pipes[0]))
26 for i in inputs:
27 print(i)
28
29 output = self.brain.predict(inputs)
30 if output[0] > 0.5:
31 self.jump()
32
33 def update(self):
34 self.birdVec.add(self.birdAcc)
35 self.birdPos.add(self.birdVec)
36 self.score += 1
37
38 def jump(self):
39 self.birdVec.y = -8
40
41 def display(self):
42 fill("#D5BB06")
43 ellipse(self.birdPos.x, self.birdPos.y, 25, 25)
44
45 def collide(self, _pipe):
46 R = 25 / 2
47 X = self.birdPos.x
48 Y = self.birdPos.y
49 if (X + R > _pipe.x and X - R < _pipe.x + _pipe.w):
50 if (Y + R > _pipe.y or Y - R < _pipe.y - _pipe.gap):
51 return True
52 if (Y > height):
53 return True
54 return False
55
56 def birdIsPass(self, _pipe):
57 if (self.birdPos.x > _pipe.x):
58 return True
59 return False
60
61class Pipe:
62 x = y = h = 0
63 w = 80
64 gap = 80
65
66 def __init__(self):
67 self.x = width + self.w
68 self.y = random(100, height - 100)
69 self.h = height
70
71 def update(self, _panSpeed):
72 self.x -= _panSpeed
73
74 def display(self):
75 fill(0, 204, 0)
76 rect(self.x, self.y, self.w, self.h)
77 rect(self.x, self.y - self.gap, self.w, -self.h)
在這段program中,首先在Bird
class中,加入birdIsPass()
:
xxxxxxxxxx
41def birdIsPass(self, _pipe):
2 if (self.birdPos.x > _pipe.x):
3 return True
4 return False
來判斷鳥是否通過了第一條水管。
再在上面的think()
中,加入有意思的參數。
xxxxxxxxxx
201def think(self, pipes):
2 inputs = []
3
4 inputs.append(map(self.birdPos.y, 0, height, 0, 1))
5 if not (self.birdIsPass(pipes[0])):
6 inputs.append(map(pipes[0].y, 0, height, 0 ,1))
7 inputs.append(map((pipes[0].y - pipes[0].gap), 0, height, 0, 1))
8 inputs.append(map(pipes[0].x, 0, width, 0, 1))
9 else:
10 inputs.append(map(pipes[1].y, 0, height, 0 ,1))
11 inputs.append(map((pipes[1].y - pipes[1].gap), 0, height, 0, 1))
12 inputs.append(map(pipes[1].x, 0, width, 0, 1))
13
14 print(self.birdIsPass(pipes[0]))
15 for i in inputs:
16 print(i)
17
18 output = self.brain.predict(inputs)
19 if output[0] > 0.5:
20 self.jump()
這裡加入了鳥本身的y座標,鳥前方水管的頂和底水管的座標,還有水管的x座標。
運行後,你可以看到,鳥是否可以通過了第一條水管,和上面幾個inputs
的參數。
flappy_bird.pyde
:
xxxxxxxxxx
461from Stuffs import *
2from GeneticAlgorithm import *
3
4population = 500
5panSpeed = 5
6birds = []
7myPipes = []
8
9def setup():
10 global birds, myPipes
11 size(800, 600)
12 frameRate(60)
13 for i in range(population):
14 birds.append(Bird())
15 myPipes = [Pipe()]
16
17def draw():
18 global birds, myPipes
19
20 if (frameCount % 100 == 0):
21 myPipes.append(Pipe())
22 if (myPipes[0].x < -myPipes[0].w):
23 myPipes.pop(0)
24
25 background("#70C6D5")
26 for bird in birds:
27 bird.think(myPipes)
28 bird.update()
29 bird.display()
30 if bird.collide(myPipes[0]):
31 birds.remove(bird)
32 print(len(birds))
33 if (len(birds) == 0):
34 nextGeneration(birds, population)
35
36 for pipe in myPipes:
37 pipe.update(panSpeed)
38 pipe.display()
39
40def keyPressed():
41 if (key == ' '):
42 birds[0].jump()
43 if (key == 'W' or key == 'w'):
44 birds[1].jump()
45 if (key == 'R' or key == 'r'):
46 setup()
Stuffs.py
:
xxxxxxxxxx
731from nn import NeuralNetwork
2
3class Bird:
4 def __init__(self):
5 self.score = 0
6 self.pipeCounter = 0
7 self.birdPos = PVector(50, height/2)
8 self.birdVec = PVector(0, 0)
9 self.birdAcc = PVector(0, 0.3)
10 self.brain = NeuralNetwork(4, 4, 1)
11
12 def think(self, pipes):
13 inputs = []
14
15 inputs.append(map(self.birdPos.y, 0, height, 0, 1))
16 if not (self.birdIsPass(pipes[0])):
17 inputs.append(map(pipes[0].y, 0, height, 0 ,1))
18 inputs.append(map((pipes[0].y - pipes[0].gap), 0, height, 0, 1))
19 inputs.append(map(pipes[0].x, 0, width, 0, 1))
20 else:
21 inputs.append(map(pipes[1].y, 0, height, 0 ,1))
22 inputs.append(map((pipes[1].y - pipes[1].gap), 0, height, 0, 1))
23 inputs.append(map(pipes[1].x, 0, width, 0, 1))
24
25 output = self.brain.predict(inputs)
26 if output[0] > 0.5:
27 self.jump()
28
29 def update(self):
30 self.birdVec.add(self.birdAcc)
31 self.birdPos.add(self.birdVec)
32 self.score += 1
33
34 def jump(self):
35 self.birdVec.y = -8
36
37 def display(self):
38 fill("#D5BB06")
39 ellipse(self.birdPos.x, self.birdPos.y, 25, 25)
40
41 def collide(self, _pipe):
42 R = 25 / 2
43 X = self.birdPos.x
44 Y = self.birdPos.y
45 if (X + R > _pipe.x and X - R < _pipe.x + _pipe.w):
46 if (Y + R > _pipe.y or Y - R < _pipe.y - _pipe.gap):
47 return True
48 if (Y > height):
49 return True
50 return False
51
52 def birdIsPass(self, _pipe):
53 if (self.birdPos.x > _pipe.x):
54 return True
55 return False
56
57class Pipe:
58 x = y = h = 0
59 w = 80
60 gap = 80
61
62 def __init__(self):
63 self.x = width + self.w
64 self.y = random(100, height - 100)
65 self.h = height
66
67 def update(self, _panSpeed):
68 self.x -= _panSpeed
69
70 def display(self):
71 fill(0, 204, 0)
72 rect(self.x, self.y, self.w, self.h)
73 rect(self.x, self.y - self.gap, self.w, -self.h)
GeneticAlgorithm.py
:
xxxxxxxxxx
51from Stuffs import *
2
3def nextGeneration(birds, _population):
4 for i in range(_population):
5 birds.append(Bird())
第一步,先將原本在Stuff.py
中,列印inputs
的值都刪去,免得一次過列印太多東西。
之後,回到主程式,
在最上方加入:
xxxxxxxxxx
31from GeneticAlgorithm import *
2
3population = 500
導入新的一個叫GeneticAlgorithm
的python檔案和一次過製作500隻鳥。
在setup()
中,將原本的birds = [Bird(), Bird()]
,轉成:
xxxxxxxxxx
21for i in range(population):
2 birds.append(Bird())
在主程中draw()
中,將所有鳥的顯示和更新等,轉成:
xxxxxxxxxx
91for bird in birds:
2 bird.think(myPipes)
3 bird.update()
4 bird.display()
5 if bird.collide(myPipes[0]):
6 birds.remove(bird)
7print(len(birds))
8if (len(birds) == 0):
9 nextGeneration(birds, population)
加入指令,只要撞到水管或跌出畫面,就將這隻鳥,從birds
中刪除,最後再觀察現時有多少隻鳥。在一次過500隻的情況下,總會有幾隻能夠捱得到不出畫面外。
如果全部鳥都被刪除,就重生下一個世代。
開一個叫GeneticAlgorithm.py
的新tag,加入:
xxxxxxxxxx
51from Stuffs import *
2
3def nextGeneration(birds, _population):
4 for i in range(_population):
5 birds.append(Bird())
這個動作,只是重新製作新一代的人口。
遺傳演算法的運作方式類似於生物進化的過程。它以一個稱為"基因型"的編碼來表示問題的解,並使用一組稱為"個體"的基因型來形成一個"族群"。每個個體都對應於問題的一個可能解。
遺傳演算法的運行過程通常包括以下步驟:
初始化:隨機生成一個初始個體族群。
適應度評估:對於每個個體,根據問題的目標函數計算其適應度,評估其解的品質。
選擇:根據適應度的大小,選擇一些優秀的個體作為下一代的父母。
交叉:將選擇的父母進行交叉操作,生成子代個體。
變異:對子代進行突變操作,引入一些隨機性,以增加搜索空間的探索能力。
更新族群:將子代個體與父代個體結合,形成新的族群。
重複執行步驟2至6,直到滿足停止條件(例如達到最大迭代次數或找到足夠好的解)。
flappy_bird.pyde
:
xxxxxxxxxx
521from Stuffs import *
2from GeneticAlgorithm import *
3
4population = 500
5panSpeed = 5
6birds = []
7allBirds = []
8bestBird = None
9myPipes = []
10counter = 0
11
12def setup():
13 global birds, myPipes, allBirds
14 size(800, 600)
15 frameRate(60)
16 for i in range(population):
17 birds.append(Bird())
18
19 myPipes = [Pipe()]
20
21def draw():
22 global birds, myPipes, counter, allBirds
23
24 counter += 1
25 if (counter % 100 == 0):
26 myPipes.append(Pipe())
27 if (myPipes[0].x < -myPipes[0].w):
28 myPipes.pop(0)
29
30 background("#70C6D5")
31 for bird in birds:
32 bird.think(myPipes)
33 bird.update()
34 bird.display()
35 if bird.collide(myPipes[0]):
36 allBirds.append(bird)
37 birds.remove(bird)
38
39 if (len(birds) == 0):
40 birds = nextGeneration(allBirds, population)
41
42 for pipe in myPipes:
43 pipe.update(panSpeed)
44 pipe.display()
45
46def keyPressed():
47 if (key == ' '):
48 birds[0].jump()
49 if (key == 'W' or key == 'w'):
50 birds[1].jump()
51 if (key == 'R' or key == 'r'):
52 setup()
Stuffs.py
:
xxxxxxxxxx
791from nn import NeuralNetwork
2
3class Bird:
4 def __init__(self):
5 self.score = 0
6 self.fitness = 0
7 self.pipeCounter = 0
8 self.birdPos = PVector(50, height/2)
9 self.birdVec = PVector(0, 0)
10 self.birdAcc = PVector(0, 0.3)
11 self.brain = NeuralNetwork(4, 4, 1)
12
13 def think(self, pipes):
14 inputs = []
15
16 inputs.append(map(self.birdPos.y, 0, height, 0, 1))
17 if not (self.birdIsPass(pipes[0])):
18 inputs.append(map(pipes[0].y, 0, height, 0 ,1))
19 inputs.append(map((pipes[0].y - pipes[0].gap), 0, height, 0, 1))
20 inputs.append(map(pipes[0].x, 0, width, 0, 1))
21 else:
22 inputs.append(map(pipes[1].y, 0, height, 0 ,1))
23 inputs.append(map((pipes[1].y - pipes[1].gap), 0, height, 0, 1))
24 inputs.append(map(pipes[1].x, 0, width, 0, 1))
25
26 output = self.brain.predict(inputs)
27 if output[0] > 0.5:
28 self.jump()
29
30 def update(self):
31 self.birdVec.add(self.birdAcc)
32 self.birdPos.add(self.birdVec)
33 self.score += 1
34
35 def jump(self):
36 self.birdVec.y = -8
37
38 def display(self):
39 fill("#D5BB06")
40 ellipse(self.birdPos.x, self.birdPos.y, 25, 25)
41
42 def collide(self, _pipe):
43 R = 25 / 2
44 X = self.birdPos.x
45 Y = self.birdPos.y
46 if (X + R > _pipe.x and X - R < _pipe.x + _pipe.w):
47 if (Y + R > _pipe.y or Y - R < _pipe.y - _pipe.gap):
48 return True
49 if (Y > height):
50 return True
51 return False
52
53 def birdIsPass(self, _pipe):
54 if (self.birdPos.x > _pipe.x):
55 return True
56 return False
57
58 def copy(self):
59 copyBird = Bird()
60 copyBird.brain = self.brain.copy()
61 return copyBird
62
63class Pipe:
64 x = y = h = 0
65 w = 80
66 gap = 180
67
68 def __init__(self):
69 self.x = width + self.w
70 self.y = random(100, height - 100)
71 self.h = height
72
73 def update(self, _panSpeed):
74 self.x -= _panSpeed
75
76 def display(self):
77 fill(0, 204, 0)
78 rect(self.x, self.y, self.w, self.h)
79 rect(self.x, self.y - self.gap, self.w, -self.h)
GeneticAlgorithm.py
:
xxxxxxxxxx
601from Stuffs import *
2import random
3
4def nextGeneration(allBirds, population):
5 global bestBird
6 newBirds = []
7 bestBird = findBestBird(allBirds)
8 resetGame()
9 normalizeFitness(allBirds)
10 activeBirds = generate(allBirds)
11 for i in range(population):
12 newBirds.append(activeBirds[i])
13 allBirds = []
14 print(len(newBirds))
15 return newBirds
16
17def normalizeFitness(birds):
18 # power all the score of birds
19 for bird in birds:
20 bird.score = pow(bird.score, 2)
21 sum = 0
22 for bird in birds:
23 sum += bird.score
24 for bird in birds:
25 bird.fitness = bird.score / sum
26
27def findBestBird(allBirds):
28 maxScore = 0
29 _bestBird = None
30 for bird in allBirds:
31 if bird.score > maxScore:
32 maxScore = bird.score
33 _bestBird = bird
34 return _bestBird
35
36def resetGame():
37 global myPipes, counter, bestBird
38
39 myPipes = []
40 counter = 0
41 if (bestBird != None):
42 bestBird.score = 0
43
44def generate(oldBirds):
45 newBirds = []
46 for i in range(len(oldBirds)):
47 bird = poolSelection(oldBirds)
48 newBirds.append(bird)
49 return newBirds
50
51def poolSelection(birds):
52 index = 0
53 r = random.random()
54 while (r > 0):
55 r -= birds[index].fitness
56 index += 1
57 index -= 1
58 bird = birds[index]
59 child = bird.copy()
60 return child
以上的程式碼,做了基因演算法的前三步。我們建立了一個500隻鳥的群組,為每一隻鳥計分,紀錄鳥生存了多久,然後我們根據每隻鳥的fitness
去抽選這隻鳥是否能遺傳下去有下一代。
當我們運行這個程式後,會發現過了幾代後,幾乎所有的鳥都只會有同一個行為,500隻鳥會疊在一起,這是因為這些鳥只有根據分數決定下一代,但沒有做交叉基因,所以幾代後,所有的鳥只會有同一個單一基因,所以行為完全一模一樣。
為方便展示效果,程式做了較多修改,我將全部都貼上來。共有5個檔案。
flappy_bird_AI.pyde
:
xxxxxxxxxx
721from Stuffs import *
2from GeneticAlgorithm import *
3from Matrix import *
4
5population = 500
6panSpeed = 5
7birds = []
8allBirds = []
9bestBird = None
10myPipes = []
11counter = 0
12frame_count = 0
13generation = 0
14
15def setup():
16
17 global birds, myPipes, allBirds, generation, counter, frame_count, bestBird
18 size(800, 600)
19 frameRate(60)
20 for i in range(population):
21 birds.append(Bird())
22
23 myPipes = [Pipe()]
24 allBirds = []
25 bestBird = None
26 generation = 1
27 counter = 0
28 frame_count = 0
29
30def draw():
31
32 global birds, myPipes, counter, allBirds, frame_count, generation
33
34 counter += 1
35 if (counter % 100 == 0):
36 myPipes.append(Pipe())
37 if (myPipes[0].x < -myPipes[0].w):
38 myPipes.pop(0)
39
40 background("#70C6D5")
41 for bird in birds:
42 bird.think(myPipes)
43 bird.update()
44 bird.display()
45 if bird.collide(myPipes[0]):
46 allBirds.append(bird)
47 birds.remove(bird)
48
49 if (len(birds) == 0):
50 frame_count += 1
51 if frame_count >= 16:
52 print("new generation")
53 generation += 1
54 birds = nextGeneration(allBirds, population)
55 allBirds = []
56 frame_count = 0
57
58 for pipe in myPipes:
59 pipe.update(panSpeed)
60 pipe.display()
61
62 fill(255)
63 text("Population: "+str(len(birds)), 20, 20)
64 text("Generation: "+str(generation), 20, 40)
65
66# def keyPressed():
67# if (key == ' '):
68# birds[0].jump()
69# if (key == 'W' or key == 'w'):
70# birds[1].jump()
71# if (key == 'R' or key == 'r'):
72# setup()
GeneticAlgorithm.py
:
xxxxxxxxxx
731from Stuffs import *
2import random
3
4def nextGeneration(allBirds, population):
5 global bestBird
6 newBirds = []
7 bestBird = findBestBird(allBirds)
8 resetGame()
9 normalizeFitness(allBirds)
10 activeBirds = generate(allBirds)
11 for i in range(population):
12 newBirds.append(activeBirds[i])
13
14 return newBirds
15
16def normalizeFitness(birds):
17 # power all the score of birds
18 for bird in birds:
19 bird.score = pow(bird.score, 2)
20 sum = 0
21 for bird in birds:
22 sum += bird.score
23 for bird in birds:
24 bird.fitness = bird.score / sum
25
26def findBestBird(allBirds):
27 maxScore = 0
28 _bestBird = None
29 for bird in allBirds:
30 if bird.score > maxScore:
31 maxScore = bird.score
32 _bestBird = bird
33 return _bestBird
34
35def resetGame():
36 global myPipes, counter, bestBird
37
38 myPipes = []
39 counter = 0
40
41 if (bestBird != None):
42 bestBird.score = 0
43
44def crossover(bird1, bird2):
45 child = Bird()
46 child.brain = NeuralNetwork(4,4,1)
47
48 child.brain.weights_ih = bird1.brain.weights_ih.crossover(bird2.brain.weights_ih)
49 child.brain.weights_ho = bird1.brain.weights_ho.crossover(bird2.brain.weights_ho)
50 child.brain.bias_h = bird1.brain.bias_h.crossover(bird2.brain.bias_h)
51 child.brain.bias_o = bird1.brain.bias_o.crossover(bird2.brain.bias_o)
52
53 return child
54
55def generate(oldBirds):
56 newBirds = []
57 for i in range(len(oldBirds)):
58 bird1 = poolSelection(oldBirds)
59 bird2 = poolSelection(oldBirds)
60 child = crossover(bird1, bird2)
61 newBirds.append(child)
62 return newBirds
63
64def poolSelection(birds):
65 index = 0
66 r = random.random()
67 while (r > 0):
68 r -= birds[index].fitness
69 index += 1
70 index -= 1
71 bird = birds[index]
72 child = bird.copy()
73 return child
Matrix.py
:
xxxxxxxxxx
1731import random
2
3class Matrix():
4
5 def __init__(self, rows, columns):
6 self.rows = rows
7 self.columns = columns
8 self.data = [[0 for x in range(columns)] for y in range(rows)]
9
10 def _print(self):
11 for row in self.data:
12 print(row)
13 print('\n')
14
15 def copy(self):
16 m = Matrix(self.rows, self.columns)
17 for i in range(self.rows):
18 for j in range(self.columns):
19 m.data[i][j] = self.data[i][j]
20 return m
21
22
23 def fromArray(cls, array):
24 m = Matrix(len(array), 1)
25 for i in range(len(array)):
26 m.data[i][0] = array[i]
27 return m
28
29 def toArray(self):
30 array = []
31 for i in range(self.rows):
32 for j in range(self.columns):
33 array.append(self.data[i][j])
34 return array
35
36 def add(self, n, b=None):
37 if b is None:
38 if isinstance(n, Matrix):
39 if self.rows != n.rows or self.columns != n.columns :
40 print("Columns and Rows of A must match Columns and Rows of B.")
41 return
42 result = Matrix(self.rows, self.columns)
43 for i in range(self.rows):
44 for j in range(self.columns):
45 result.data[i][j] = self.data[i][j] + n.data[i][j]
46 else:
47 result = Matrix(self.rows, self.columns)
48 for i in range(self.rows):
49 for j in range(self.columns):
50 result.data[i][j] = self.data[i][j] + n
51 else:
52 if isinstance(n, Matrix) and isinstance(b, Matrix):
53 if n.rows != b.rows or n.columns != b.columns:
54 print("Columns and Rows of A, B and C must match.")
55 return
56 result = Matrix(n.rows, n.columns)
57 for i in range(n.rows):
58 for j in range(n.columns):
59 result.data[i][j] = n.data[i][j] + b.data[i][j]
60 elif isinstance(n, Matrix) and not isinstance(b, Matrix):
61 result = Matrix(n.rows, n.columns)
62 for i in range(n.rows):
63 for j in range(n.columns):
64 result.data[i][j] = n.data[i][j] + b
65 return result
66
67 def subtract(self, n, b = None):
68 if b is None:
69 if isinstance(n, Matrix):
70 return self.add(n.multiply(-1))
71 else:
72 return self.add(-n)
73 else:
74 if isinstance(n, Matrix) and isinstance(b, Matrix):
75 return n.add(b.multiply(-1))
76 elif isinstance(n, Matrix) and not isinstance(b, Matrix):
77 return n.add(-b)
78
79
80 def randomize(self):
81 for i in range(self.rows):
82 for j in range(self.columns):
83 self.data[i][j] = random.uniform(-1, 1)
84
85 def transpose(self):
86 result = Matrix(self.columns, self.rows)
87 for i in range(self.rows):
88 for j in range(self.columns):
89 result.data[j][i] = self.data[i][j]
90 return result
91
92 def multiply(self, n, b = None):
93 if b is None:
94 # check if n is a matrix or a scalar
95 if isinstance(n, Matrix):
96 if self.columns != n.rows:
97 print("Columns of A must match rows of B.")
98 return
99 result = Matrix(self.rows, n.columns)
100 for i in range(result.rows):
101 for j in range(result.columns):
102 sum = 0
103 for k in range(self.columns):
104 sum += self.data[i][k] * n.data[k][j]
105 result.data[i][j] = sum
106 return result
107 else:
108 result = Matrix(self.rows, self.columns)
109 for i in range(self.rows):
110 for j in range(self.columns):
111 result.data[i][j] = self.data[i][j] * n
112 return result
113 else:
114 if isinstance(n, Matrix) and isinstance(b, Matrix):
115 if n.columns != b.rows:
116 print("Columns of A must match rows of B.")
117 return
118 result = Matrix(n.rows, b.columns)
119 for i in range(result.rows):
120 for j in range(result.columns):
121 sum = 0
122 for k in range(n.columns):
123 sum += n.data[i][k] * b.data[k][j]
124 result.data[i][j] = sum
125 return result
126 elif isinstance(n, Matrix) and not isinstance(b, Matrix):
127 result = Matrix(n.rows, n.columns)
128 for i in range(n.rows):
129 for j in range(n.columns):
130 result.data[i][j] = n.data[i][j] * b
131 return result
132
133 def hadamard_product(self, n, b = None):
134 if b is None:
135 if self.rows != n.rows or self.columns != n.columns:
136 print("Columns and Rows of A must match Columns and Rows of B.")
137 return
138 result = Matrix(self.rows, self.columns)
139 for i in range(result.rows):
140 for j in range(result.columns):
141 result.data[i][j] = self.data[i][j] * n.data[i][j]
142 return result
143 else:
144 if n.rows != b.rows or n.columns != b.columns:
145 print("Columns and Rows of A, B and C must match.")
146 return
147 result = Matrix(n.rows, n.columns)
148 for i in range(result.rows):
149 for j in range(result.columns):
150 result.data[i][j] = n.data[i][j] * b.data[i][j]
151 return result
152
153 def map(self, func):
154 for i in range(self.rows):
155 for j in range(self.columns):
156 val = self.data[i][j]
157 self.data[i][j] = func(val)
158 return self
159
160 def crossover(self, other):
161 crossoverPoint = random.randint(0, self.rows * self.columns)
162 childData = []
163 for i in range(self.rows):
164 childRow = []
165 for j in range(self.columns):
166 if i * self.columns + j < crossoverPoint:
167 childRow.append(self.data[i][j])
168 else:
169 childRow.append(other.data[i][j])
170 childData.append(childRow)
171 child = Matrix(self.rows, self.columns)
172 child.data = childData
173 return child
Stuffs.py
:
xxxxxxxxxx
811from nn import NeuralNetwork
2
3class Bird:
4 def __init__(self):
5 self.score = 0
6 self.fitness = 0
7 self.pipeCounter = 0
8 self.birdPos = PVector(50, height/2)
9 self.birdVec = PVector(0, 0)
10 self.birdAcc = PVector(0, 0.3)
11 self.brain = NeuralNetwork(4, 4, 1)
12
13 def think(self, pipes):
14 inputs = []
15
16 inputs.append(map(self.birdPos.y, 0, height, 0, 1))
17 if not (self.birdIsPass(pipes[0])):
18 inputs.append(map(pipes[0].y, 0, height, 0 ,1))
19 inputs.append(map((pipes[0].y - pipes[0].gap), 0, height, 0, 1))
20 inputs.append(map(pipes[0].x, 0, width, 0, 1))
21 else:
22 inputs.append(map(pipes[1].y, 0, height, 0 ,1))
23 inputs.append(map((pipes[1].y - pipes[1].gap), 0, height, 0, 1))
24 inputs.append(map(pipes[1].x, 0, width, 0, 1))
25
26 output = self.brain.predict(inputs)
27 if output[0] > 0.5:
28 self.jump()
29
30 def update(self):
31 self.birdVec.add(self.birdAcc)
32 self.birdPos.add(self.birdVec)
33 self.score += 1
34
35 def jump(self):
36 self.birdVec.y = -8
37
38 def display(self):
39 fill("#D5BB06")
40 ellipse(self.birdPos.x, self.birdPos.y, 25, 25)
41 fill("#FF0000")
42 text(self.score, self.birdPos.x, self.birdPos.y - 20)
43
44 def collide(self, _pipe):
45 R = 25 / 2
46 X = self.birdPos.x
47 Y = self.birdPos.y
48 if (X + R > _pipe.x and X - R < _pipe.x + _pipe.w):
49 if (Y + R > _pipe.y or Y - R < _pipe.y - _pipe.gap):
50 return True
51 if (Y > height):
52 return True
53 return False
54
55 def birdIsPass(self, _pipe):
56 if (self.birdPos.x > _pipe.x):
57 return True
58 return False
59
60 def copy(self):
61 copyBird = Bird()
62 copyBird.brain = self.brain.copy()
63 return copyBird
64
65class Pipe:
66 x = y = h = 0
67 w = 80
68 gap = 280
69
70 def __init__(self):
71 self.x = width + self.w
72 self.y = random(100, height - 100)
73 self.h = height
74
75 def update(self, _panSpeed):
76 self.x -= _panSpeed
77
78 def display(self):
79 fill(0, 204, 0)
80 rect(self.x, self.y, self.w, self.h)
81 rect(self.x, self.y - self.gap, self.w, -self.h)
nn.py
:
xxxxxxxxxx
1341from Matrix import *
2import math
3
4class ActivationFunction:
5 def __init__(self, func, dfunc):
6 self.func = func
7 self.dfunc = dfunc
8
9def sigmoid_func(x):
10 return 1 / (1 + math.exp(-x))
11
12def sigmoid_dfunc(y):
13 return y * (1 - y)
14
15sigmoid = ActivationFunction(sigmoid_func, sigmoid_dfunc)
16
17def tanh_func(x):
18 return math.tanh(x)
19
20def tanh_dfunc(y):
21 return 1 - (y * y)
22
23tanh = ActivationFunction(tanh_func, tanh_dfunc)
24
25class NeuralNetwork:
26 def __init__(self, a, b=None, c=None):
27 if isinstance(a, NeuralNetwork):
28 self.input_nodes = a.input_nodes
29 self.hidden_nodes = a.hidden_nodes
30 self.output_nodes = a.output_nodes
31
32 self.weights_ih = a.weights_ih.copy()
33 self.weights_ho = a.weights_ho.copy()
34
35 self.bias_h = a.bias_h.copy()
36 self.bias_o = a.bias_o.copy()
37 else:
38 self.input_nodes = a
39 self.hidden_nodes = b
40 self.output_nodes = c
41
42 self.weights_ih = Matrix(self.hidden_nodes, self.input_nodes)
43 self.weights_ho = Matrix(self.output_nodes, self.hidden_nodes)
44 self.weights_ih.randomize()
45 self.weights_ho.randomize()
46
47 self.bias_h = Matrix(self.hidden_nodes, 1)
48 self.bias_o = Matrix(self.output_nodes, 1)
49 self.bias_h.randomize()
50 self.bias_o.randomize()
51
52 self.setLearningRate(0.1)
53 self.setActivationFunction(sigmoid)
54
55 def predict(self, input_array):
56 # Generating the Hidden Outputs
57 inputs = Matrix.fromArray(input_array)
58 hidden = Matrix.multiply(self.weights_ih, inputs)
59 hidden = hidden.add(self.bias_h)
60 # activation function!
61 hidden.map(self.activation_function.func)
62
63 # Generating the output's output!
64 output = Matrix.multiply(self.weights_ho, hidden)
65 output = output.add(self.bias_o)
66 output.map(self.activation_function.func)
67
68 # Sending back to the caller!
69 return output.toArray()
70
71 def setLearningRate(self, learning_rate):
72 self.learning_rate = learning_rate
73
74 def setActivationFunction(self, func):
75 self.activation_function = func
76
77 def train(self, input_array, target_array):
78 # Generating the Hidden Outputs
79 inputs = Matrix.fromArray(input_array)
80 hidden = Matrix.multiply(self.weights_ih, inputs)
81 hidden = hidden.add(self.bias_h)
82 # activation function!
83 hidden.map(self.activation_function.func)
84 # Generating the output's output!
85 outputs = Matrix.multiply(self.weights_ho, hidden)
86 outputs = outputs.add(self.bias_o)
87 outputs.map(self.activation_function.func)
88
89 # Convert array to matrix object
90 targets = Matrix.fromArray(target_array)
91
92 # Calculate the error
93 # ERROR = TARGETS - OUTPUTS
94 output_errors = Matrix.subtract(targets, outputs)
95
96 # let gradient = outputs * (1 - outputs);
97 # Calculate gradient
98 gradients = Matrix.map(outputs, self.activation_function.dfunc)
99 gradients = gradients.hadamard_product(output_errors)
100 gradients = gradients.multiply(self.learning_rate)
101
102 # Calculate deltas
103 hidden_T = Matrix.transpose(hidden)
104 weight_ho_deltas = Matrix.multiply(gradients, hidden_T)
105 # Adjust the weights by deltas
106 self.weights_ho = self.weights_ho.add(weight_ho_deltas)
107 # Adjust the bias by its deltas (which is just the gradients)
108 self.bias_o = self.bias_o.add(gradients)
109
110 # Calculate the hidden layer errors
111 who_t = Matrix.transpose(self.weights_ho)
112 hidden_errors = Matrix.multiply(who_t, output_errors)
113
114 # Calculate hidden gradient
115 hidden_gradient = Matrix.map(hidden, self.activation_function.dfunc)
116 hidden_gradient = hidden_gradient.hadamard_product(hidden_errors)
117 hidden_gradient = hidden_gradient.multiply(self.learning_rate)
118
119 # Calcuate input->hidden deltas
120 inputs_T = Matrix.transpose(inputs)
121 weight_ih_deltas = Matrix.multiply(hidden_gradient, inputs_T)
122
123 self.weights_ih = self.weights_ih.add(weight_ih_deltas)
124 # Adjust the bias by its deltas (which is just the gradients)
125 self.bias_h = self.bias_h.add(hidden_gradient)
126
127 def copy(self):
128 return NeuralNetwork(self)
129
130 def mutate(self, func):
131 self.weights_ih.map(func)
132 self.weights_ho.map(func)
133 self.bias_o.map(func)
134 self.bias_h.map(func)
為方便和調試,我將遊戲的難度降底了,可以看到,去到第12代後,在沒有設定timeout的情況下,最終有20隻鳥有40000分以上。
這裡,我主要在Matrix
class中加入了crossover
功能,讓矩陣能夠交換內容,之後也在GeneticAlgorithm.py
中,將鳥交換基因,令到每一代都能跟隨上一代的父母交換基因特徵。
但是否每一次都能成功呢?當然不是,就算生物在進化時,有些物種不能適應環境,就算勉強繁演下去,也會全族滅亡,舉個例子,如果這500隻鳥也沒有跳過水管的能力,那麼它們的基因傳下去,也沒有這個能力的。就像近親繁殖一樣,如果找不到新的基因,太相似的基因一路繁演下去,就會變得很單一,一個可能是對這個水管十分熟識,整個族群都能拿到高分,一個可能是對環境十分不熟識,全部群眾都很低分。但就算是前者, 也會因為對環境十分熟悉,這時如果將水管的開口變窄,就會完全不適應而全族滅亡。為了應該這個問題,便需要在遺傳中,加入「基因變異(mutation)」。
GeneticAlgorithm.py
:
xxxxxxxxxx
811from Stuffs import *
2import random
3
4def nextGeneration(allBirds, population):
5 global bestBird
6 newBirds = []
7 bestBird = findBestBird(allBirds)
8 resetGame()
9 normalizeFitness(allBirds)
10 activeBirds = generate(allBirds)
11 for i in range(population):
12 newBirds.append(activeBirds[i])
13
14 return newBirds
15
16def normalizeFitness(birds):
17 # power all the score of birds
18 for bird in birds:
19 bird.score = pow(bird.score, 2)
20 sum = 0
21 for bird in birds:
22 sum += bird.score
23 for bird in birds:
24 bird.fitness = bird.score / sum
25
26def findBestBird(allBirds):
27 maxScore = 0
28 _bestBird = None
29 for bird in allBirds:
30 if bird.score > maxScore:
31 maxScore = bird.score
32 _bestBird = bird
33 return _bestBird
34
35def resetGame():
36 global myPipes, counter, bestBird
37
38 myPipes = []
39 counter = 0
40
41 if (bestBird != None):
42 bestBird.score = 0
43
44def crossover(bird1, bird2):
45 child = Bird()
46 child.brain = NeuralNetwork(4,4,1)
47
48 child.brain.weights_ih = bird1.brain.weights_ih.crossover(bird2.brain.weights_ih)
49 child.brain.weights_ho = bird1.brain.weights_ho.crossover(bird2.brain.weights_ho)
50 child.brain.bias_h = bird1.brain.bias_h.crossover(bird2.brain.bias_h)
51 child.brain.bias_o = bird1.brain.bias_o.crossover(bird2.brain.bias_o)
52
53 return child
54
55def mutation_func(x):
56 mutation_rate = 0.002
57 if random.random() < mutation_rate:
58 return x + random.gauss(0, 0.05)
59 else:
60 return x
61
62def generate(oldBirds):
63 newBirds = []
64 for i in range(len(oldBirds)):
65 bird1 = poolSelection(oldBirds)
66 bird2 = poolSelection(oldBirds)
67 child = crossover(bird1, bird2)
68 child.brain.mutate(mutation_func)
69 newBirds.append(child)
70 return newBirds
71
72def poolSelection(birds):
73 index = 0
74 r = random.random()
75 while (r > 0):
76 r -= birds[index].fitness
77 index += 1
78 index -= 1
79 bird = birds[index]
80 child = bird.copy()
81 return child
今次只有一個細節位有改變。
在這裡,我們加入了一個mutation_func
,用來令神經網路改變。之後在原本的generate
中,在基因交換後,加入child.brain.mutate(mutation_func)
令基因變異。
xxxxxxxxxx
161def mutation_func(x):
2 mutation_rate = 0.002
3 if random.random() < mutation_rate:
4 return x + random.gauss(0, 0.05)
5 else:
6 return x
7
8def generate(oldBirds):
9 newBirds = []
10 for i in range(len(oldBirds)):
11 bird1 = poolSelection(oldBirds)
12 bird2 = poolSelection(oldBirds)
13 child = crossover(bird1, bird2)
14 child.brain.mutate(mutation_func)
15 newBirds.append(child)
16 return newBirds
這裡有2個數值得們我們關注:
mutation_rate = 0.002
: 是變異率,0.002
大約是500隻當中,有一隻會有變異,這個其實已經相當高,人類的變異率,是
x + random.gauss(0, 0.05)
: 另一個是我們的變異函數,在發生變異時,有多大程度影響這個基因,如果太大的話,會令整個基因產生翻天覆地的變化,由貓變異做狗,但如果太少的話,對效果又不明顯。
來到這裡,這個程式已經完成,但也有不少改良空間。
鳥的大腦,中間隱藏層只有4個神經源,在The Coding Train的例子中,神經網路不是我們的NeuralNetwork(4, 4, 1)
,即4個輸入變量,4層隱藏層和1個輸出,而是NeuralNetwork(5, 8, 2)
,他的例子有2個輸出,如果輸出1少於輸出2,則鳥就會跳,令神經網路的複雜度增加,你試試先將神經網路轉成NeuralNetwork(4, 16, 1)
,看看鳥有沒有更聰明。之後再試著變成NeuralNetwork(4, 8, 2)
。
在原例子中,神經網路是NeuralNetwork(5, 8, 2)
,因為輸入的考慮因素,還有一個是birdVec
,試著將birdVec.y
加入變為考慮: inputs.append( map(this.birdVec.y, -5, 5, 0, 1))
鳥的生命沒有限制,令成功生存的鳥一直生存下去,沒有機會變下一世代,不利基因遺傳,試設定一個liftcount,鳥在經過2000個frameCount
後就會滅亡,還入下一個世代。